{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark.sql import SQLContext\n",
    "\n",
    "# create spark contexts\n",
    "sc = pyspark.SparkContext()\n",
    "sqlContext = SQLContext(sc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import udf\n",
    "from pyspark.sql.types import StringType\n",
    "import preproc as pp\n",
    "# Register all the functions in Preproc with Spark Context\n",
    "check_lang_udf = udf(pp.check_lang, StringType())\n",
    "remove_stops_udf = udf(pp.remove_stops, StringType())\n",
    "remove_features_udf = udf(pp.remove_features, StringType())\n",
    "tag_and_remove_udf = udf(pp.tag_and_remove, StringType())\n",
    "lemmatize_udf = udf(pp.lemmatize, StringType())\n",
    "check_blanks_udf = udf(pp.check_blanks, StringType())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- text: string (nullable = true)\n",
      " |-- id: string (nullable = true)\n",
      " |-- label: double (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Load a text file and convert each line to a Row.\n",
    "data_rdd = sc.textFile(\"data/nlpdata/raw_classified.txt\")\n",
    "parts_rdd = data_rdd.map(lambda l: l.split(\"\\t\"))\n",
    "# Filter bad rows out\n",
    "garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)\n",
    "typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))\n",
    "#Create DataFrame\n",
    "data_df = sqlContext.createDataFrame(typed_rdd, [\"text\", \"id\", \"label\"])\n",
    "#data_df.show()\n",
    "data_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+------------------+-----+\n",
      "|                text|                id|label|\n",
      "+--------------------+------------------+-----+\n",
      "|Fresh install of ...|        1018769417|  1.0|\n",
      "|Well. Now I know ...|       10284216536|  1.0|\n",
      "|\"Literally six we...|       10298589026|  1.0|\n",
      "|Mitsubishi i MiEV...|109017669432377344|  1.0|\n",
      "+--------------------+------------------+-----+\n",
      "only showing top 4 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "data_df.show(4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# predict language and filter out those with less than 90% chance of being English\n",
    "lang_df = data_df.withColumn(\"lang\", check_lang_udf(data_df[\"text\"]))\n",
    "en_df = lang_df.filter(lang_df[\"lang\"] == \"en\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- text: string (nullable = true)\n",
      " |-- id: string (nullable = true)\n",
      " |-- label: double (nullable = true)\n",
      " |-- lang: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "en_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+------------------+-----+----+\n",
      "|                text|                id|label|lang|\n",
      "+--------------------+------------------+-----+----+\n",
      "|RT @goeentertain:...|665305154954989568|  1.0|  en|\n",
      "|Teforia Uses Mach...|660668007975268352|  1.0|  en|\n",
      "|   Apple TV or Roku?|       25842461136|  1.0|  en|\n",
      "|Finished http://t...|        9412369614|  1.0|  en|\n",
      "+--------------------+------------------+-----+----+\n",
      "only showing top 4 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "en_df.show(4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# remove stop words to reduce dimensionality\n",
    "rm_stops_df = en_df.withColumn(\"stop_text\", remove_stops_udf(en_df[\"text\"]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- text: string (nullable = true)\n",
      " |-- id: string (nullable = true)\n",
      " |-- label: double (nullable = true)\n",
      " |-- lang: string (nullable = true)\n",
      " |-- stop_text: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "rm_stops_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+------------------+-----+----+--------------------+\n",
      "|                text|                id|label|lang|           stop_text|\n",
      "+--------------------+------------------+-----+----+--------------------+\n",
      "|RT @goeentertain:...|665305154954989568|  1.0|  en|RT @goeentertain:...|\n",
      "|Teforia Uses Mach...|660668007975268352|  1.0|  en|Teforia Uses Mach...|\n",
      "|   Apple TV or Roku?|       25842461136|  1.0|  en|      Apple TV Roku?|\n",
      "|Finished http://t...|        9412369614|  1.0|  en|Finished http://t...|\n",
      "+--------------------+------------------+-----+----+--------------------+\n",
      "only showing top 4 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "rm_stops_df.show(4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# remove other non essential words, think of it as my personal stop word list\n",
    "rm_features_df = rm_stops_df.withColumn(\"feat_text\", \\\n",
    "                                        remove_features_udf(rm_stops_df[\"stop_text\"]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- text: string (nullable = true)\n",
      " |-- id: string (nullable = true)\n",
      " |-- label: double (nullable = true)\n",
      " |-- lang: string (nullable = true)\n",
      " |-- stop_text: string (nullable = true)\n",
      " |-- feat_text: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "rm_features_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+------------------+-----+----+--------------------+--------------------+\n",
      "|                text|                id|label|lang|           stop_text|           feat_text|\n",
      "+--------------------+------------------+-----+----+--------------------+--------------------+\n",
      "|RT @goeentertain:...|665305154954989568|  1.0|  en|RT @goeentertain:...|  future blase   ...|\n",
      "|Teforia Uses Mach...|660668007975268352|  1.0|  en|Teforia Uses Mach...|teforia uses mach...|\n",
      "|   Apple TV or Roku?|       25842461136|  1.0|  en|      Apple TV Roku?|         apple  roku|\n",
      "|Finished http://t...|        9412369614|  1.0|  en|Finished http://t...|            finished|\n",
      "+--------------------+------------------+-----+----+--------------------+--------------------+\n",
      "only showing top 4 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "rm_features_df.show(4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# tag the words remaining and keep only Nouns, Verbs and Adjectives\n",
    "tagged_df = rm_features_df.withColumn(\"tagged_text\", \\\n",
    "                                      tag_and_remove_udf(rm_features_df.feat_text))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- text: string (nullable = true)\n",
      " |-- id: string (nullable = true)\n",
      " |-- label: double (nullable = true)\n",
      " |-- lang: string (nullable = true)\n",
      " |-- stop_text: string (nullable = true)\n",
      " |-- feat_text: string (nullable = true)\n",
      " |-- tagged_text: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "tagged_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+------------------+-----+----+--------------------+--------------------+--------------------+\n",
      "|                text|                id|label|lang|           stop_text|           feat_text|         tagged_text|\n",
      "+--------------------+------------------+-----+----+--------------------+--------------------+--------------------+\n",
      "|RT @goeentertain:...|665305154954989568|  1.0|  en|RT @goeentertain:...|  future blase   ...| future blase vic...|\n",
      "|Teforia Uses Mach...|660668007975268352|  1.0|  en|Teforia Uses Mach...|teforia uses mach...| teforia uses mac...|\n",
      "|   Apple TV or Roku?|       25842461136|  1.0|  en|      Apple TV Roku?|         apple  roku|         apple roku |\n",
      "|Finished http://t...|        9412369614|  1.0|  en|Finished http://t...|            finished|           finished |\n",
      "+--------------------+------------------+-----+----+--------------------+--------------------+--------------------+\n",
      "only showing top 4 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "tagged_df.show(4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# lemmatization of remaining words to reduce dimensionality & boost measures\n",
    "lemm_df = tagged_df.withColumn(\"lemm_text\", lemmatize_udf(tagged_df[\"tagged_text\"]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# lemmatization of remaining words to reduce dimensionality & boost measures\n",
    "lemm_df = tagged_df.withColumn(\"lemm_text\", lemmatize_udf(tagged_df[\"tagged_text\"]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- text: string (nullable = true)\n",
      " |-- id: string (nullable = true)\n",
      " |-- label: double (nullable = true)\n",
      " |-- lang: string (nullable = true)\n",
      " |-- stop_text: string (nullable = true)\n",
      " |-- feat_text: string (nullable = true)\n",
      " |-- tagged_text: string (nullable = true)\n",
      " |-- lemm_text: string (nullable = true)\n",
      " |-- is_blank: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# remove all rows containing only blank spaces\n",
    "check_blanks_df = lemm_df.withColumn(\"is_blank\", check_blanks_udf(lemm_df[\"lemm_text\"]))\n",
    "no_blanks_df = check_blanks_df.filter(check_blanks_df[\"is_blank\"] == \"False\")\n",
    "no_blanks_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# rename columns\n",
    "no_blanks_df = no_blanks_df.withColumn(\"text\",no_blanks_df.lemm_text)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# dedupe important since alot of the tweets only differed by url's and RT mentions\n",
    "dedup_df = no_blanks_df.dropDuplicates(['text', 'label'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# select only the columns we care about\n",
    "data_set = dedup_df.select('id', 'text','label')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+--------------------+-----+\n",
      "|                id|                text|label|\n",
      "+------------------+--------------------+-----+\n",
      "|        1546813742|              dragon|  1.0|\n",
      "|        1558492525|           hurt much|  1.0|\n",
      "|383221484023709697|seth blog word se...|  1.0|\n",
      "|660668007975268352|teforia use machi...|  1.0|\n",
      "+------------------+--------------------+-----+\n",
      "only showing top 4 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "data_set.show(4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Split the data into training and test sets (30% held out for testing)\n",
    "(trainingData, testData) = data_set.randomSplit([0.6, 0.4])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import HashingTF, IDF, Tokenizer\n",
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.classification import NaiveBayes, RandomForestClassifier \n",
    "from pyspark.ml.classification import DecisionTreeClassifier\n",
    "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
    "from pyspark.ml.tuning import ParamGridBuilder\n",
    "from pyspark.ml.tuning import CrossValidator\n",
    "from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.\n",
    "tokenizer = Tokenizer(inputCol=\"text\", outputCol=\"words\")\n",
    "hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol=\"features\")\n",
    "idf = IDF(minDocFreq=3, inputCol=\"features\", outputCol=\"idf\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# \n",
    "nb = NaiveBayes()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, nb])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Train model.  This also runs the indexers.\n",
    "model = pipeline.fit(trainingData)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Make predictions.\n",
    "predictions = model.transform(testData)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----+----------+\n",
      "|                text|label|prediction|\n",
      "+--------------------+-----+----------+\n",
      "|           hurt much|  1.0|       1.0|\n",
      "|teforia use machi...|  1.0|       1.0|\n",
      "|              finish|  1.0|       1.0|\n",
      "|future blase vice...|  1.0|       1.0|\n",
      "|              divine|  1.0|       1.0|\n",
      "+--------------------+-----+----------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Select example rows to display.\n",
    "predictions.select(\"text\", \"label\", \"prediction\").show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0.912655971479501"
      ]
     },
     "execution_count": 32,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
    "evaluator = MulticlassClassificationEvaluator(predictionCol=\"prediction\")\n",
    "evaluator.evaluate(predictions)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Cross Validation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "#paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 1.0]).build()\n",
    "# paramGrid = ParamGridBuilder().addGrid(rf.maxDepth,[4,8,10]).\\\n",
    "#                     addGrid(rf.impurity, ['entropy','gini']).build()\n",
    "\n",
    "\n",
    "# cv = CrossValidator(estimator=pipeline, \n",
    "#                     estimatorParamMaps=paramGrid, \n",
    "#                     evaluator=MulticlassClassificationEvaluator(), \n",
    "#                     numFolds=4)\n",
    "                    \n",
    "\n",
    "# #training_df.show(5)  \n",
    "# cvModel = cv.fit(training_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "#prediction = cvModel.transform(test_df)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
